/* -------------------------------------------------------------------------
 *
 * hashinsert.cpp
 *	  Item insertion in hash tables for Postgres.
 *
 * Portions Copyright (c) 2021 Huawei Technologies Co.,Ltd.
 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  src/gausskernel/storage/access/hash/hashinsert.cpp
 *
 * -------------------------------------------------------------------------
 */
#include "postgres.h"
#include "knl/knl_variable.h"

#include "access/hash.h"
#include "access/hash_xlog.h"
#include "access/heapam.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "storage/lock/lwlock.h"
#include "storage/buf/buf_internals.h"

static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf, RelFileNode hnode);

/*
 *	_hash_doinsert() -- Handle insertion of a single index tuple.
 *
 * This routine is called by the public interface routines, hashbuild
 * and hashinsert.  By here, itup is completely filled in.
 */
void _hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel)
{
    Buffer buf;
    Buffer bucket_buf;
    Buffer metabuf;
    HashMetaPage metap;
    HashMetaPage usedmetap = NULL;
    Page metapage;
    Page page;
    HashPageOpaque pageopaque;
    Size itemsz;
    bool do_expand = false;
    uint32 hashkey;
    Bucket bucket;
    OffsetNumber itup_off;
    /*
     * Get the hash key for the item (it's stored in the index tuple itself).
     */
    hashkey = _hash_get_indextuple_hashkey(itup);

    /* compute item size too */
    itemsz = IndexTupleDSize(*itup);
    itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
                                * need to be consistent */

restart_insert:

    /*
     * Read the metapage.  We don't lock it yet; HashMaxItemSize() will
     * examine pd_pagesize_version, but that can't change so we can examine it
     * without a lock.
     */
    metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
    metapage = BufferGetPage(metabuf);

    /*
     * Check whether the item can fit on a hash page at all. (Eventually, we
     * ought to try to apply TOAST methods if not.)  Note that at this point,
     * itemsz doesn't include the ItemId.
     *
     * XXX this is useless code if we are only storing hash keys.
     */
    if (itemsz > (Size)HashMaxItemSize(metapage))
        ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                        errmsg("index row size %lu exceeds hash maximum %lu", (unsigned long)itemsz,
                               (unsigned long)HashMaxItemSize(metapage)),
                        errhint("Values larger than a buffer page cannot be indexed.")));

    /* Lock the primary bucket page for the target bucket. */
    buf = _hash_getbucketbuf_from_hashkey(rel, hashkey, HASH_WRITE, &usedmetap);
    Assert(usedmetap != NULL);

    /* remember the primary bucket buffer to release the pin on it at end. */
    bucket_buf = buf;

    page = BufferGetPage(buf);
    pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
    bucket = pageopaque->hasho_bucket;

    /*
     * If this bucket is in the process of being split, try to finish the
     * split before inserting, because that might create room for the
     * insertion to proceed without allocating an additional overflow page.
     * It's only interesting to finish the split if we're trying to insert
     * into the bucket from which we're removing tuples (the "old" bucket),
     * not if we're trying to insert into the bucket into which tuples are
     * being moved (the "new" bucket).
     */
    if (H_BUCKET_BEING_SPLIT(pageopaque) && IsBufferCleanupOK(buf)) {
        /* release the lock on bucket buffer, before completing the split. */
        LockBuffer(buf, BUFFER_LOCK_UNLOCK);

        _hash_finish_split(rel, metabuf, buf, bucket,
                           usedmetap->hashm_maxbucket,
                           usedmetap->hashm_highmask,
                           usedmetap->hashm_lowmask);

        /* release the pin on old and meta buffer.  retry for insert. */
        _hash_dropbuf(rel, buf);
        _hash_dropbuf(rel, metabuf);
        goto restart_insert;
    }

    /* Do the insertion */
    while (PageGetFreeSpace(page) < itemsz) {
        BlockNumber nextblkno;

        /*
         * Check if current page has any DEAD tuples. If yes, delete these
         * tuples and see if we can get a space for the new item to be
         * inserted before moving to the next page in the bucket chain.
         */
        if (H_HAS_DEAD_TUPLES(pageopaque)) {
            if (IsBufferCleanupOK(buf)) {
                _hash_vacuum_one_page(rel, metabuf, buf, heapRel->rd_node);

                if (PageGetFreeSpace(page) >= itemsz)
                    break;        /* OK, now we have enough space */
            }
        }

        /*
         * no space on this page; check for an overflow page
         */
        nextblkno = pageopaque->hasho_nextblkno;

        if (BlockNumberIsValid(nextblkno)) {
            /*
             * ovfl page exists; go get it.  if it doesn't have room, we'll
             * find out next pass through the loop test above.  we always
             * release both the lock and pin if this is an overflow page, but
             * only the lock if this is the primary bucket page, since the pin
             * on the primary bucket must be retained throughout the scan.
             */
            if (buf != bucket_buf)
                _hash_relbuf(rel, buf);
            else
                LockBuffer(buf, BUFFER_LOCK_UNLOCK);
            buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
            page = BufferGetPage(buf);
        } else {
            /*
             * we're at the end of the bucket chain and we haven't found a
             * page with enough room.  allocate a new overflow page.
             */

            /* release our write lock without modifying buffer */
            LockBuffer(buf, BUFFER_LOCK_UNLOCK);

            /* chain to a new overflow page */
            buf = _hash_addovflpage(rel, metabuf, buf, (buf == bucket_buf) ? true : false);
            page = BufferGetPage(buf);

            /* should fit now, given test above */
            Assert(PageGetFreeSpace(page) >= itemsz);
        }
        pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
        Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
        Assert(pageopaque->hasho_bucket == bucket);
    }

    /*
     * Write-lock the metapage so we can increment the tuple count. After
     * incrementing it, check to see if it's time for a split.
     */
    LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);

    /* Do the update.  No ereport(ERROR) until changes are logged */
    START_CRIT_SECTION();

    /* found page with enough space, so add the item here */
    itup_off = _hash_pgaddtup(rel, buf, itemsz, itup);
    MarkBufferDirty(buf);

    /* metapage operations */
    metap = HashPageGetMeta(metapage);
    metap->hashm_ntuples += 1;

    /* Make sure this stays in sync with _hash_expandtable() */
    do_expand = metap->hashm_ntuples > (double)metap->hashm_ffactor * (metap->hashm_maxbucket + 1);

    MarkBufferDirty(metabuf);

    /* XLOG stuff */
    if (RelationNeedsWAL(rel)) {
        xl_hash_insert xlrec;
        XLogRecPtr recptr;

        xlrec.offnum = itup_off;

        XLogBeginInsert();
        XLogRegisterData((char *) &xlrec, SizeOfHashInsert);

        XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);

        XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
        XLogRegisterBufData(0, (char *) itup, IndexTupleDSize(*itup));

        recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INSERT);

        PageSetLSN(BufferGetPage(buf), recptr);
        PageSetLSN(BufferGetPage(metabuf), recptr);
    }

    END_CRIT_SECTION();

    /* drop lock on metapage, but keep pin */
    LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);

    /*
     * Release the modified page and ensure to release the pin on primary
     * page.
     */
    _hash_relbuf(rel, buf);
    if (buf != bucket_buf)
        _hash_dropbuf(rel, bucket_buf);

    /* Attempt to split if a split is needed */
    if (do_expand)
        _hash_expandtable(rel, metabuf);

    /* Finally drop our pin on the metapage */
    _hash_dropbuf(rel, metabuf);
}

/*
 *	_hash_pgaddtup() -- add a tuple to a particular page in the index.
 *
 * This routine adds the tuple to the page as requested; it does not write out
 * the page.  It is an error to call pgaddtup() without pin and write lock on
 * the target buffer.
 *
 * Returns the offset number at which the tuple was inserted.  This function
 * is responsible for preserving the condition that tuples in a hash index
 * page are sorted by hashkey value.
 */
OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
{
    OffsetNumber itup_off;
    Page page;
    uint32 hashkey;

    _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
    page = BufferGetPage(buf);

    /* Find where to insert the tuple (preserving page's hashkey ordering) */
    hashkey = _hash_get_indextuple_hashkey(itup);
    itup_off = _hash_binsearch(page, hashkey);
    if (PageAddItem(page, (Item)itup, itemsize, itup_off, false, false) == InvalidOffsetNumber)
        ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
                        errmsg("failed to add index item to \"%s\"", RelationGetRelationName(rel))));

    return itup_off;
}

/*
 *	_hash_pgaddmultitup() -- add a tuple vector to a particular page in the index.
 *
 * This routine has same requirements for locking and tuple ordering as
 * _hash_pgaddtup().
 *
 * Returns the offset number array at which the tuples were inserted.
 */
void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups, OffsetNumber *itup_offsets, uint16 nitups)
{
    OffsetNumber itup_off;
    Page page;
    uint32 hashkey;
    int i;

    _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
    page = BufferGetPage(buf);

    for (i = 0; i < nitups; i++) {
        Size itemsize;

        itemsize = IndexTupleDSize(*itups[i]);
        itemsize = MAXALIGN(itemsize);

        /* Find where to insert the tuple (preserving page's hashkey ordering) */
        hashkey = _hash_get_indextuple_hashkey(itups[i]);
        itup_off = _hash_binsearch(page, hashkey);

        itup_offsets[i] = itup_off;

        if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false) == InvalidOffsetNumber)
            elog(ERROR, "failed to add index item to \"%s\"", RelationGetRelationName(rel));
    }
}

/*
 *	_hash_vacuum_one_page - vacuum just one index page.
 *
 * Try to remove LP_DEAD items from the given page. We must acquire cleanup
 * lock on the page being modified before calling this function.
 */

static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf, RelFileNode hnode)
{
    OffsetNumber deletable[MaxOffsetNumber];
    int ndeletable = 0;
    OffsetNumber offnum;
    OffsetNumber maxoff;
    Page page = BufferGetPage(buf);
    HashPageOpaque pageopaque;
    HashMetaPage metap;

    /* Scan each tuple in page to see if it is marked as LP_DEAD */
    maxoff = PageGetMaxOffsetNumber(page);
    for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) {
        ItemId itemId = PageGetItemId(page, offnum);

        if (ItemIdIsDead(itemId))
            deletable[ndeletable++] = offnum;
    }

    if (ndeletable > 0) {
        /*
         * Write-lock the meta page so that we can decrement tuple count.
         */
        LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);

        /* No ereport(ERROR) until changes are logged */
        START_CRIT_SECTION();

        PageIndexMultiDelete(page, deletable, ndeletable);

        /*
         * Mark the page as not containing any LP_DEAD items. This is not
         * certainly true (there might be some that have recently been marked,
         * but weren't included in our target-item list), but it will almost
         * always be true and it doesn't seem worth an additional page scan to
         * check it. Remember that LH_PAGE_HAS_DEAD_TUPLES is only a hint
         * anyway.
         */
        pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
        pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;

        metap = HashPageGetMeta(BufferGetPage(metabuf));
        metap->hashm_ntuples -= ndeletable;

        MarkBufferDirty(buf);
        MarkBufferDirty(metabuf);

        /* XLOG stuff */
        if (RelationNeedsWAL(rel)) {
            xl_hash_vacuum_one_page xlrec;
            XLogRecPtr recptr;

            xlrec.hnode = hnode;
            xlrec.ntuples = ndeletable;

            XLogBeginInsert();
            XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
            XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage);

            /*
             * We need the target-offsets array whether or not we store the
             * whole buffer, to allow us to find the latestRemovedXid on a
             * standby server.
             */
            XLogRegisterData((char *) deletable,
                             ndeletable * sizeof(OffsetNumber));

            XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);

            recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);

            PageSetLSN(BufferGetPage(buf), recptr);
            PageSetLSN(BufferGetPage(metabuf), recptr);
        }

        END_CRIT_SECTION();

        /*
         * Releasing write lock on meta page as we have updated the tuple
         * count.
         */
        LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
    }
}
